package com.hzzftech.watchdog.busi.service.impl;

import com.hzzftech.watchdog.busi.config.KettleConfig;
import com.hzzftech.watchdog.busi.constants.BusiConstant;
import com.hzzftech.watchdog.busi.core.executor.JobExecutor;
import com.hzzftech.watchdog.busi.core.executor.TransExecutor;
import com.hzzftech.watchdog.busi.core.executor.starter.KettleEnvironmentStarter;
import com.hzzftech.watchdog.busi.core.file.repository.KtFileRepositoryFileParser;
import com.hzzftech.watchdog.busi.core.file.repository.KtGitRepositoryFileParser;
import com.hzzftech.watchdog.busi.core.util.KettleThreadPool;
import com.hzzftech.watchdog.busi.domain.*;
import com.hzzftech.watchdog.busi.encrypt.DESEncrypt;
import com.hzzftech.watchdog.busi.mapper.KtDispatcherMapper;
import com.hzzftech.watchdog.busi.service.*;
import com.hzzftech.watchdog.busi.utils.FileUtils;
import com.hzzftech.watchdog.common.core.text.Convert;
import com.hzzftech.watchdog.common.utils.ShiroUtils;
import com.hzzftech.watchdog.common.utils.StringUtils;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.vfs2.util.UserAuthenticatorUtils;
import org.pentaho.di.cluster.SlaveServer;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleMissingPluginsException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobExecutionConfiguration;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.LongObjectId;
import org.pentaho.di.repository.RepositoriesMeta;
import org.pentaho.di.repository.Repository;
import org.pentaho.di.repository.StringObjectId;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.TransExecutionConfiguration;
import org.pentaho.di.trans.TransMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.*;

/**
 * 任务调度Service业务层处理
 * 
 * @author liquanxiang
 * @date 2021-12-14
 */
@Service("KtDispatcherServiceImpl")
public class KtDispatcherServiceImpl implements IKtDispatcherService 
{
    @Autowired
    private KtDispatcherMapper ktDispatcherMapper;

    /**
     * 查询任务调度
     * 
     * @param id 任务调度主键
     * @return 任务调度
     */
    @Override
    public KtDispatcher selectKtDispatcherById(Long id, String repoType)
    {
        if (BusiConstant.REPOSITORY_TYPE_GIT.equals(repoType)) {
            // git资源仓库
            return ktDispatcherMapper.selectKtDispatcherByIdG(id);
        } else if (BusiConstant.REPOSITORY_TYPE_FILE.equals(repoType)){
            // 文件资源仓库
            return ktDispatcherMapper.selectKtDispatcherByIdF(id);
        } if (BusiConstant.REPOSITORY_TYPE_DB.equals(repoType)){
            // 数据库资源仓库
            return ktDispatcherMapper.selectKtDispatcherByIdDB(id);
        } else {
            throw new RuntimeException("仓库类型不正确");
        }
    }

    @Override
    public KtDispatcher selectKtDispatcherById(Long id) {
        return ktDispatcherMapper.selectKtDispatcherById(id);
    }

    /**
     * 查询任务调度列表
     * 
     * @param ktDispatcher 任务调度
     * @return 任务调度
     */
    @Override
    public List<KtDispatcher> selectKtDispatcherList(KtDispatcher ktDispatcher, String repoType)
    {
        if (BusiConstant.REPOSITORY_TYPE_GIT.equals(repoType)) {
            // git资源仓库
            return ktDispatcherMapper.selectKtDispatcherListG(ktDispatcher);
        } else if (BusiConstant.REPOSITORY_TYPE_FILE.equals(repoType)){
            // 文件资源仓库
            return ktDispatcherMapper.selectKtDispatcherListF(ktDispatcher);
        } else if (BusiConstant.REPOSITORY_TYPE_DB.equals(repoType)) {
            // 数据库资源仓库
            return ktDispatcherMapper.selectKtDispatcherListDB(ktDispatcher);
        } else {
            throw new RuntimeException("仓库类型不正确");
        }
    }

    @Override
    public List<KtDispatcher> selectKtDispatcherList(KtDispatcher ktDispatcher) {
        return ktDispatcherMapper.selectKtDispatcherList(ktDispatcher);
    }

    /**
     * 新增任务调度
     * 
     * @param ktDispatcher 任务调度
     * @return 结果
     */
    @Override
    @Transactional
    public int insertKtDispatcher(KtDispatcher ktDispatcher)
    {
        int resultCount = 0;
        String filePath = "";
        if (BusiConstant.REPOSITORY_TYPE_GIT.equals(ktDispatcher.getDpRepoType())) {
            // git资源仓库
            // 获取dpIds
            String dpIds = ktDispatcher.getDpIds();
            for (String dpId : dpIds.split(",")) {
                Long _dpId = Long.parseLong(dpId);
                List<KtDispatcher> dpObjList = ktDispatcherMapper.selectKtDispatcherByDpId(_dpId);
                KtRepositoryFile file = fileService.selectKtRepositoryFileById(_dpId);
                KtGitRepositoryFileParser fileParser = new KtGitRepositoryFileParser(fileService, ktDispatcher.getRepositoryId(), null);
                filePath = fileParser.getFilePath(file);
                if (Objects.isNull(dpObjList) || dpObjList.size() == 0) {
                    ktDispatcher.setDpName(file.getfName().substring(0, file.getfName().length() - 4));
                } else {
                    ktDispatcher.setDpName(file.getfName().substring(0, file.getfName().length() - 4)+"("+(dpObjList.size()+1)+")");
                }

                if (BusiConstant.KETTLE_TYPE_TRANS.equals(FileUtils.getSuffixName(filePath))) {
                    ktDispatcher.setDpType(BusiConstant.KETTLE_TYPE_FLAG_TRANS);
                } else if (BusiConstant.KETTLE_TYPE_JOB.equals(FileUtils.getSuffixName(filePath))) {
                    ktDispatcher.setDpType(BusiConstant.KETTLE_TYPE_FLAG_JOB);
                } else {
                    // 文件不是ktr文件或者kjb文件
                    return -1;
                }

                KtDispatcher insertObj = new KtDispatcher();
                try {
                    BeanUtils.copyProperties(insertObj, ktDispatcher);
                    insertObj.setDpId(_dpId);
                    // 设置调用地址
                    insertObj.setDpPath(filePath);
                    ktDispatcherMapper.insertKtDispatcher(insertObj);
                } catch (Exception e) {
                    logger.error("cory ktdispatcher出错",e);
                    return -1;
                }
                resultCount ++;
            }
        } else if (BusiConstant.REPOSITORY_TYPE_FILE.equals(ktDispatcher.getDpRepoType())) {
            // 文件资源仓库
            String dpIds = ktDispatcher.getDpIds();
            for (String dpId : dpIds.split(",")) {
                Long _dpId = Long.parseLong(dpId);
                List<KtDispatcher> dpObjList = ktDispatcherMapper.selectKtDispatcherByDpId(_dpId);
                KtFileRepositoryMsg ktFileRepositoryMsg = fileRepositoryMsgService.selectKtFileRepositoryMsgById(_dpId);
                KtFileRepositoryFileParser fileParser = new KtFileRepositoryFileParser(fileRepositoryMsgService, versionService, ktDispatcher.getRepositoryId(), FILE_REPOSITORY_LOCATION);
                filePath = fileParser.getFilePath(ktFileRepositoryMsg);
                if (Objects.isNull(dpObjList) || dpObjList.size() == 0) {
                    ktDispatcher.setDpName(ktFileRepositoryMsg.getFileName());
                } else {
                    ktDispatcher.setDpName((dpObjList.size()+1) + "_" + ktFileRepositoryMsg.getFileName() );
                }

                if (BusiConstant.KETTLE_TYPE_TRANS.equals(FileUtils.getSuffixName(filePath))) {
                    ktDispatcher.setDpType(BusiConstant.KETTLE_TYPE_FLAG_TRANS);
                } else if (BusiConstant.KETTLE_TYPE_JOB.equals(FileUtils.getSuffixName(filePath))) {
                    ktDispatcher.setDpType(BusiConstant.KETTLE_TYPE_FLAG_JOB);
                } else {
                    // 文件不是ktr文件或者kjb文件
                    return -1;
                }

                KtDispatcher insertObj = new KtDispatcher();
                try {
                    BeanUtils.copyProperties(insertObj, ktDispatcher);
                    // 设置调用地址
                    insertObj.setDpPath(filePath);
                    insertObj.setDpId(_dpId);
                    ktDispatcherMapper.insertKtDispatcher(insertObj);
                } catch (Exception e) {
                    logger.error("cory ktdispatcher出错",e);
                    return -1;
                }
                resultCount ++;
            }
        }else if (BusiConstant.REPOSITORY_TYPE_DB.equals(ktDispatcher.getDpRepoType())) {
            // 数据库资源仓库
            KettleDatabaseRepository repository = (KettleDatabaseRepository) KettleEnvironmentStarter.instance.getRepository();

            try {
                if (StringUtils.isNotEmpty(ktDispatcher.getDpIds())) {
                    int r = 0;
                    for (String ignored : StringUtils.split(ktDispatcher.getDpIds(), ',')) {
                        Long dpId = Long.parseLong(ignored);
                        if (ktDispatcher.getDpType().equals(BusiConstant.KETTLE_TYPE_FLAG_TRANS)) {
                            TransMeta transMeta = repository.loadTransformation(new LongObjectId(dpId), "");
                            ktDispatcher.setDpId(dpId);
                            ktDispatcher.setDpType(BusiConstant.KETTLE_TYPE_FLAG_TRANS);
                            ktDispatcher.setDpRepoType(BusiConstant.REPOSITORY_TYPE_DB);
                            ktDispatcher.setDpName(transMeta.getName());
                            ktDispatcher.setStatus(BusiConstant.STATUS_YES);
                            ktDispatcher.setCreatedTime(new Date());
                            ktDispatcher.setCreatedBy(ShiroUtils.getUserId());
                            ktDispatcher.setDpPath(transMeta.getPathAndName());
                        } else {
                            String path = repository.findDirectory(new LongObjectId(dpId)).getPath() +"/";
//                            filePath = path + repository.loadJob(new LongObjectId(dpId), "").getName();
                            JobMeta jobMeta = repository.loadJob(new LongObjectId(dpId), "");
                            ktDispatcher.setDpType(BusiConstant.KETTLE_TYPE_FLAG_JOB);
                            ktDispatcher.setDpRepoType(BusiConstant.REPOSITORY_TYPE_DB);
                            ktDispatcher.setDpId(dpId);
                            ktDispatcher.setDpName(jobMeta.getName());
                            ktDispatcher.setStatus(BusiConstant.STATUS_YES);
                            ktDispatcher.setCreatedTime(new Date());
                            ktDispatcher.setCreatedBy(ShiroUtils.getUserId());
                            ktDispatcher.setDpPath(path+jobMeta.getName());
                        }
                        ktDispatcherMapper.insertKtDispatcher(ktDispatcher);
                        resultCount++;
                    }
                }
            } catch (Exception e) {
                logger.error("新增DB调度失败", e);
                return 0;
            }
        } else {
            return 0;
        }

        return resultCount;
    }



    /**
     * 修改任务调度
     * 
     * @param ktDispatcher 任务调度
     * @return 结果
     */
    @Override
    public int updateKtDispatcher(KtDispatcher ktDispatcher)
    {
        if (BusiConstant.REPOSITORY_TYPE_GIT.equals(ktDispatcher.getDpRepoType())) {
            // git资源仓库
            KtRepositoryFile file = fileService.selectKtRepositoryFileById(ktDispatcher.getDpId());
            KtGitRepositoryFileParser fileParser = new KtGitRepositoryFileParser(fileService, file.getRepositoryId(), null);
            String filePath = fileParser.getFilePath(file);
            ktDispatcher.setDpPath(filePath);
        } else if (BusiConstant.REPOSITORY_TYPE_FILE.equals(ktDispatcher.getDpRepoType())) {
            // 文件资源仓库
            KtFileRepositoryMsg ktFileRepositoryMsg = fileRepositoryMsgService.selectKtFileRepositoryMsgById(ktDispatcher.getDpId());
            KtFileRepositoryFileParser fileParser = new KtFileRepositoryFileParser(fileRepositoryMsgService, versionService, ktFileRepositoryMsg.getfRepoId(), FILE_REPOSITORY_LOCATION);
            String filePath = fileParser.getFilePath(ktFileRepositoryMsg);
            ktDispatcher.setDpPath(filePath);
        }else if (BusiConstant.REPOSITORY_TYPE_DB.equals(ktDispatcher.getDpRepoType())) {
            // 处理repo为数据库类型
        } else {
            return 0;
        }
        return ktDispatcherMapper.updateKtDispatcher(ktDispatcher);
    }

    /**
     * 批量删除任务调度
     * 
     * @param ids 需要删除的任务调度主键
     * @return 结果
     */
    @Override
    public int deleteKtDispatcherByIds(String ids)
    {
        return ktDispatcherMapper.deleteKtDispatcherByIds(Convert.toStrArray(ids));
    }

    /**
     * 删除任务调度信息
     * 
     * @param id 任务调度主键
     * @return 结果
     */
    @Override
    public int deleteKtDispatcherById(Long id)
    {
        return ktDispatcherMapper.deleteKtDispatcherById(id);
    }

    @Autowired
    private IKtRepositoryService repositoryService;

    @Autowired
    private IKtFileRepositoryService fileRepositoryService;

    @Autowired
    private IKtRepositoryFileService fileService;

    @Autowired
    private IKtFileRepositoryMsgService fileRepositoryMsgService;

    @Autowired
    private IKtParamsService iKtParamsService;

    @Autowired
    private IKtRepositoryNodeService nodeService;

    @Value("${kt-watchdog.DES_SECURITY_KEY}")
    private String desSecurityKey;

    @Override
    public int launchGJob(String ids) throws KettleMissingPluginsException, KettleXMLException {
        return -1;
    }

    public int launchFJob(String ids, Integer jobId) throws KettleMissingPluginsException, KettleXMLException {
        if (jobId != null) {
            Long _id = Long.parseLong(jobId.toString());
            return launchFJob(ids, _id);
        }

        return -1;
    }

    @Override
    public int launchFJob(String ids, Long jobId) throws KettleMissingPluginsException, KettleXMLException {
        List<KtDispatcher> dispatchers = ktDispatcherMapper.selectKtDispatcherByIdGs(Convert.toStrArray(ids));
        int count = 0;
        for (KtDispatcher dispatcher : dispatchers) {
            if (dispatcher.getStatus().equals(BusiConstant.STATUS_NO)) {
                continue;
            }

            if (dispatcher == null) {
                return -1;
            }
            // 运行参数解密
            List<KtParams> params = iKtParamsService.selectKtParamsEnableListByGroupId(dispatcher.getParamsGroupId());
            for (KtParams param : params) {
                if (param.getEncrypt().equals(BusiConstant.DES_ENCRYPT_YES)) {
                    try {
                        param.setParamsValue(DESEncrypt.decrypt(param.getParamsValue(), desSecurityKey));
                    } catch (Exception e) {
                        return -1;
                    }
                }
            }

            if (dispatcher.getDpRepoType().equals(BusiConstant.REPOSITORY_TYPE_GIT)) {
                switch (dispatcher.getDpType()) {
                    // JOB
                    case BusiConstant.KETTLE_TYPE_FLAG_JOB:
                        startGJob(dispatcher,
                                repositoryService.selectKtRepositoryByRepositoryId(dispatcher.getRepositoryId()),
                                fileService.selectKtRepositoryFileById(dispatcher.getDpId()),
                                params,
                                nodeService.selectKtRepositoryNodeByNodeId(dispatcher.getCarteId()), jobId);
                        break;
                    // trans
                    case BusiConstant.KETTLE_TYPE_FLAG_TRANS:
                        startGTrans(dispatcher,
                                repositoryService.selectKtRepositoryByRepositoryId(dispatcher.getRepositoryId()),
                                fileService.selectKtRepositoryFileById(dispatcher.getDpId()),
                                params,
                                nodeService.selectKtRepositoryNodeByNodeId(dispatcher.getCarteId()), jobId);
                        break;
                    default:
                        continue;
                }
            } else if (dispatcher.getDpRepoType().equals(BusiConstant.REPOSITORY_TYPE_FILE)) {
                switch (dispatcher.getDpType()) {
                    case BusiConstant.KETTLE_TYPE_FLAG_JOB:
                        startFJob(dispatcher,
                                fileRepositoryService.selectKtFileRepositoryById(dispatcher.getRepositoryId()),
                                fileRepositoryMsgService.selectKtFileRepositoryMsgById(dispatcher.getDpId()),
                                params,
                                nodeService.selectKtRepositoryNodeByNodeId(dispatcher.getCarteId()), jobId);
                        break;
                    case BusiConstant.KETTLE_TYPE_FLAG_TRANS:
                        startFTrans(dispatcher,
                                fileRepositoryService.selectKtFileRepositoryById(dispatcher.getRepositoryId()),
                                fileRepositoryMsgService.selectKtFileRepositoryMsgById(dispatcher.getDpId()),
                                params,
                                nodeService.selectKtRepositoryNodeByNodeId(dispatcher.getCarteId()), jobId);
                        break;
                    default:
                        continue;
                }
            } else if (dispatcher.getDpRepoType().equals(BusiConstant.REPOSITORY_TYPE_DB)) {
                switch (dispatcher.getDpType()) {
                    case BusiConstant.KETTLE_TYPE_FLAG_JOB:
                        try {
                            startDBJob(dispatcher,
                                    params,
                                    nodeService.selectKtRepositoryNodeByNodeId(dispatcher.getCarteId()), jobId);
                        } catch (KettleException e) {
                            logger.error("出错", e);
                            continue;
                        }
                        break;
                    case BusiConstant.KETTLE_TYPE_FLAG_TRANS:
                        try {
                            startDBTrans(dispatcher,
                                    params,
                                    nodeService.selectKtRepositoryNodeByNodeId(dispatcher.getCarteId()), jobId);
                        } catch (KettleException e) {
                            logger.error("出错", e);
                            continue;
                        }
                        break;
                    default:
                        continue;
                }
            }
            count ++;
        }
        return count;
    }

    @Override
    public int launchDBJob(String ids) throws KettleMissingPluginsException, KettleXMLException {
        return -1;
    }

    // 使用线程池执行trans任务
    private void startDBTrans(KtDispatcher dispatcher, List<KtParams> params, KtRepositoryNode node,Long jobId) throws KettleException {
        KettleDatabaseRepository repository = (KettleDatabaseRepository) KettleEnvironmentStarter.instance.getRepository();

        TransMeta transMeta = repository.loadTransformation(new LongObjectId(dispatcher.getDpId()), dispatcher.getDpName());
        transMeta.setName(dispatcher.getDpName());
        transMeta.setObjectId(new StringObjectId(String.valueOf(dispatcher.getId())));
        transMeta.setRepository(repository);
        TransExecutor executor = new TransExecutor(decodeTrans(params, node), transMeta,dispatcher.getDpRepoType(), jobId);
        KettleThreadPool.getInstance().run(executor);
    }
    // 使用线程池执行job任务
    private void startDBJob(KtDispatcher dispatcher, List<KtParams> params, KtRepositoryNode node,Long jobId) throws KettleException {
        KettleDatabaseRepository repository = (KettleDatabaseRepository) KettleEnvironmentStarter.instance.getRepository();
        JobMeta jobMeta = repository.loadJob(new LongObjectId(dispatcher.getDpId()), dispatcher.getDpName());
        jobMeta.setName(dispatcher.getDpName());
        jobMeta.setObjectId(new StringObjectId(String.valueOf(dispatcher.getId())));
        jobMeta.setRepository(repository);
        JobExecutor executor = new JobExecutor(decodeJob(params, node), jobMeta,dispatcher.getDpRepoType(), jobId);
        KettleThreadPool.getInstance().run(executor);
    }

    @Override
    public List<KtDispatcher> selectAllEnable() {
        return ktDispatcherMapper.selectAllEnable();
    }

    @Override
    public KtDispatcher selectKtDispatcherByDpName(String dpName) {
        return ktDispatcherMapper.selectKtDispatcherByDpName(dpName);
    }

    @Autowired
    private IKtFileVersionService versionService;
    // 使用线程池执行trans任务
    private void startFTrans(KtDispatcher dispatcher, KtFileRepository fRepo, KtFileRepositoryMsg msg, List<KtParams> params, KtRepositoryNode node,Long jobId) throws KettleMissingPluginsException, KettleXMLException {
        KtFileRepositoryFileParser fileParser = new KtFileRepositoryFileParser(fileRepositoryMsgService, versionService, fRepo.getId(), FILE_REPOSITORY_LOCATION);
        String filePath = FILE_REPOSITORY_LOCATION + fileParser.getFilePath(msg);
        if (!FileUtils.exists(filePath)) {
            throw new RuntimeException("文件未找到【"+filePath+"]");
        }

        TransMeta meta = new TransMeta(filePath);
        meta.setName(dispatcher.getDpName());
        meta.setObjectId(new StringObjectId(String.valueOf(dispatcher.getId())));
        TransExecutor executor = new TransExecutor(decodeTrans(params, node), meta,dispatcher.getDpRepoType(), jobId );
        KettleThreadPool.getInstance().run(executor);
    }
    // 使用线程池执行job任务
    private void startFJob(KtDispatcher dispatcher, KtFileRepository fRepo, KtFileRepositoryMsg msg, List<KtParams> params, KtRepositoryNode node,Long jobId) throws KettleXMLException {
        KtFileRepositoryFileParser fileParser = new KtFileRepositoryFileParser(fileRepositoryMsgService, versionService, fRepo.getId(), FILE_REPOSITORY_LOCATION);
        String filePath = FILE_REPOSITORY_LOCATION + fileParser.getFilePath(msg);
        if (!FileUtils.exists(filePath)) {
            throw new RuntimeException("文件未找到【"+filePath+"]");
        }

        JobMeta meta = new JobMeta(filePath, null);
        meta.setName(dispatcher.getDpName());
        meta.setObjectId(new StringObjectId(String.valueOf(dispatcher.getId())));
        JobExecutor executor = new JobExecutor(decodeJob( params, node), meta, dispatcher.getDpRepoType(), jobId);
        KettleThreadPool.getInstance().run(executor);
    }

    @Value("${kt-watchdog.git_file_repository_location}")
    private String GIT_FILE_REPOSITORY_LOCATION;

    @Value("${kt-watchdog.file_repository_location}")
    private String FILE_REPOSITORY_LOCATION;
    // 使用线程池执行任务
    public boolean startGJob(KtDispatcher dispatcher, KtRepository repo, KtRepositoryFile file, List<KtParams> params, KtRepositoryNode node,Long jobId) throws KettleXMLException {

        KtGitRepositoryFileParser fileParser = new KtGitRepositoryFileParser(fileService, repo.getRepositoryId(), null);
        String filePath = GIT_FILE_REPOSITORY_LOCATION + fileParser.getFilePath(file);
        if (!FileUtils.exists(filePath)) {
            return false;
        }

        JobMeta meta = new JobMeta(filePath, null);
        meta.setName(dispatcher.getDpName());
        meta.setObjectId(new StringObjectId(String.valueOf(dispatcher.getId())));
        JobExecutor executor = new JobExecutor(decodeJob( params, node), meta, dispatcher.getDpRepoType(), jobId);
        KettleThreadPool.getInstance().run(executor);
        return true;
    }
    // 使用线程池执行任务
    public boolean startGTrans(KtDispatcher dispatcher, KtRepository repo, KtRepositoryFile file, List<KtParams> params, KtRepositoryNode node,Long jobId) throws KettleMissingPluginsException, KettleXMLException {
        KtGitRepositoryFileParser fileParser = new KtGitRepositoryFileParser(fileService, repo.getRepositoryId(), null);
        String filePath = GIT_FILE_REPOSITORY_LOCATION + fileParser.getFilePath(file);
        if (!FileUtils.exists(filePath)) {
            return false;
        }

        TransMeta meta = new TransMeta(filePath);
        meta.setName(dispatcher.getDpName());
        meta.setObjectId(new StringObjectId(String.valueOf(dispatcher.getId())));
        TransExecutor executor = new TransExecutor(decodeTrans(params, node), meta,dispatcher.getDpRepoType(), jobId);
        KettleThreadPool.getInstance().run(executor);
        return true;
    }

    public static Logger logger = LoggerFactory.getLogger(KtDispatcherServiceImpl.class);

    public TransExecutionConfiguration decodeTrans(List<KtParams> params, KtRepositoryNode node) {
        TransExecutionConfiguration transExecutionConfiguration = new TransExecutionConfiguration();
        transExecutionConfiguration.setRemoteServer(getSlaveServer(node));
        Map<String, String> vars = getMapVar(params);
        logger.info("trans运行参数：{}", vars);
        transExecutionConfiguration.setParams(vars);
        transExecutionConfiguration.setArguments(vars);
        transExecutionConfiguration.setVariables(vars);
        logger.info("trans运行参数。params={}, arguments={}", transExecutionConfiguration.getParams(), transExecutionConfiguration.getArguments());
        transExecutionConfiguration.setExecutingRemotely(true);
        transExecutionConfiguration.setExecutingLocally(true);
        return transExecutionConfiguration;
    }

    public JobExecutionConfiguration decodeJob(List<KtParams> params, KtRepositoryNode node) {
        JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
        jobExecutionConfiguration.setRemoteServer(getSlaveServer(node));
        Map<String, String> vars = getMapVar(params);
        logger.info("job运行参数：{}", vars);
        jobExecutionConfiguration.setParams(vars);
        jobExecutionConfiguration.setArguments(vars);
        jobExecutionConfiguration.setVariables(vars);
        jobExecutionConfiguration.setExecutingRemotely(true);
        jobExecutionConfiguration.setExecutingLocally(false);
//        jobExecutionConfiguration.setRepository(KettleEnvironmentStarter.instance.repository);
        logger.info("job运行参数。params={}, arguments={}", jobExecutionConfiguration.getParams(), jobExecutionConfiguration.getArguments());
        return jobExecutionConfiguration;
    }

    private Map<String, String> getMapVar(List<KtParams> params) {
        Map<String, String> map = new HashMap<>();
        for (KtParams p : params) {
            map.put(p.getParamsKey(), p.getParamsValue());
        }
        return map;
    }

    public SlaveServer getSlaveServer(KtRepositoryNode node) {
        SlaveServer slaveServer = new SlaveServer();
        slaveServer.setName(node.getName());
        slaveServer.setHostname(node.getNodeHost());
        slaveServer.setMaster(true);
        slaveServer.setPort(node.getNodePort());
        slaveServer.setUsername(node.getNodeUsername());
        slaveServer.setPassword(node.getNodePassword());
        slaveServer.setObjectId(new StringObjectId(String.valueOf(node.getNodeId())));
        return slaveServer;
    }

    public boolean warnIdSelected(String id, String ids) {
        if (StringUtils.isNotEmpty(ids) && StringUtils.isNotEmpty(id)) {
            String[] split = ids.split(",");
            for (String s: split) {
                if (StringUtils.isNotEmpty(s) && id.equals(s)) {
                    return true;
                }
            }
        }

        return false;
    }

    @Override
    public KtDispatcher selectKtDispatcherForMonitor(KtDispatcher dispatcher) {
        Map<String, Object> stringObjectMap = ktDispatcherMapper.selectKtDispatcherForMonitor(dispatcher);
        Object currentStatus = stringObjectMap.get("currentStatus");
        Object exeCount = stringObjectMap.get("exeCount");
        Object exeSuccessCount = stringObjectMap.get("exeSuccessCount");
        Object exeFailedCount = stringObjectMap.get("exeFailedCount");
        dispatcher.setCurrentStatus((String) currentStatus);
        dispatcher.setExeCount(Long.parseLong(exeCount.toString()) );
        dispatcher.setExeSuccessCount(Long.parseLong(exeSuccessCount.toString()) );
        dispatcher.setExeFailedCount(Long.parseLong(exeFailedCount.toString()));
        return dispatcher;
    }

    public static void main(String[] args) {

    }
}
